盘点Flink实战踩过的坑
数据倾斜导致子任务积压
业务背景
一个流程中,有两个重要子任务:一是数据迁移,将kafka实时数据落Es,二是将kafka数据做窗口聚合落hbase,两个子任务接的是同一个Topic GroupId。上游 Topic 的 tps 高峰达到5-6w。
问题描述
给 24个 TaskManager(CPU) 都会出现来不及消费的情况
问题原因
做窗口聚合的任务的分组字段,分组粒度太小,hash不能打散,数据倾斜严重,导致少数 TaskManager 上压力过大,从而影响落Es的效率,导致背压。
解决方式
将两个任务独立开来,作为不同的流程。
结果
修改之前 24个 TaskManager(CPU) 来不及消费,改完之后 20 个 CPU 可完成任务。Kafka实时数据落Es的16个TaskManager,将kafka数据做窗口聚合落hbase的4个TaskManager。
另:
同样的数据、同样的Tps作为数据输入,Hbase的输出能力远超过Es,考虑实时任务落数据进Es要慎重。
Flink任务落Es时要考虑设置微批落数据,设置 bulk.flush.max.actions 和 bulk.flush.interval.ms至合适值,否则影响吞吐量。
Kafka 消息大小默认配置太小,导致数据未处理
业务背景
正常的Flink任务消费 Topic 数据,但是Topic中的数据为 XML 以及 JSON,单条数据较大
问题描述
Flink各项metrics指标正常,但是没处理到数据
问题原因
Topic中单条数据 > 1M,超过 Kafka Consumer 处理单条数据的默认最大值。
解决方式
有三种可选方式:扩大kafka consumer 单条数据的数据大小:fetch.message.max.bytes。对消息进行压缩:上游 kafka producer 设置 compression.codec 和 commpressed.topics。业务上对数据切片,在上游 kafka producer 端将数据切片为 10K,使用分区主键确保同一条数据发送到同一Partition,consumer对消息重组。
结果
方式一:按业务要求扩大 Kafka Consumer 可处理的单条数据字节数即可正常处理业务 方式二:Kafka Consumer 需先解码,再进行业务处理。方式三:Kafka Consumer 需先重组数据,再进行业务处理。
Tps 很大,Kafka Ack 默认配置 拖慢消息处理速度
业务背景
实时任务,上游接流量页面点击事件的数据,下游输出Kafka,输出tps很大。流量数据不重要,可接受丢失的情况
问题描述
CPU资源耗费较多的情况下,才能正常消费,考虑如果缩减资源。
问题原因
Kafka Producer 默认 acks=1,即Partition Leader接收到消息而且写入本地磁盘了,就认为成功了
解决方式
Kafka Producer 设置 :props.put(“acks”, “0”); 将 acks=0,即KafkaProducer在客户端,只要把消息发送出去,不管那条数据有没有在哪怕Partition Leader上落到磁盘,直接就认为这个消息发送成功了。
结果
资源降低三分之一。
The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.
org.apache.flink.util.FlinkException: The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:786) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:756) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:948) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372) at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:803) at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:340) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
程序内存占用过大,导致TaskManager在yarn上kill了,分析原因应该是资源不够,可以将程序放在资源更大的集群上,再不行就设置减少Slot中共享的task的个数,也可能是内存泄露或内存资源配置不合理造成,需要进行合理分配。
The heartbeat of TaskManager with id container ....... timed out
此错误是container心跳超时,出现此种错误一般有两种可能:
1、分布式物理机网络失联,这种原因一般情况下failover后作业能正常恢复,如果出现的不频繁可以不用关注;2、failover的节点对应TM的内存设置太小,GC严重导致心跳超时,建议调大对应节点的内存值。
Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#6643546564]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".
在flink-conf.yaml中添加或修改:akka.ask.timeout: 100s web.timeout: 100000
Checkpoint:Checkpoint expired before completing
checkpointConf.setCheckpointTimeout(5000L)这个值设置过小,默认是10min,需要进行调大测试。
Kafka partition leader切换导致Flink重启
Flink重启,查看日志,显示:
java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:280) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
查看Kafka的Controller日志,显示:
INFO [SessionExpirationListener on 10], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
关于producer参数设置,设置retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做3次尝试:
kafkaProducerConfig
{
"bootstrap.servers": "192.169.2.20:9093,192.169.2.21:9093,192.169.2.22:9093"
"retries":3
}
注意 mapWithState & TTL 的重要性
在处理包含无限多键的数据时,要考虑到 keyed 状态保留策略(通过 TTL 定时器来在给定的时间之后清理未使用的数据)是很重要的。术语『无限』在这里有点误导,因为如果你要处理的 key 以 128 位编码,则 key 的最大数量将会有个限制(等于 2 的 128 次方)。但这是一个巨大的数字!你可能无法在状态中存储那么多值,所以最好考虑你的键空间是无界的,同时新键会随着时间不断出现。
如果你的 keyed 状态包含在某个 Flink 的默认窗口中,则将是安全的:即使未使用 TTL,在处理窗口的元素时也会注册一个清除计时器,该计时器将调用 clearAllState 函数,并删除与该窗口关联的状态及其元数据。
如果要使用 Keyed State Descriptor 来管理状态,可以很方便地添加 TTL 配置,以确保在状态中的键数量不会无限制地增加。
但是,你可能会想使用更简便的 mapWithState 方法,该方法可让你访问 valueState 并隐藏操作的复杂性。虽然这对于测试和少量键的数据来说是很好的选择,但如果在生产环境中遇到无限多键值时,会引发问题。由于状态是对你隐藏的,因此你无法设置 TTL,并且默认情况下未配置任何 TTL。这就是为什么值得考虑做一些额外工作的原因,如声明诸如 RichMapFunction 之类的东西,这将使你能更好的控制状态的生命周期。
部署和资源问题
(0) JDK版本过低
这不是个显式错误,但是JDK版本过低很有可能会导致Flink作业出现各种莫名其妙的问题,因此在生产环境中建议采用JDK 8的较高update(我们使用的是181)。
(1) Could not build the program from JAR file
该信息不甚准确,因为绝大多数情况下都不是JAR包本身有毛病,而是在作业提交过程中出现异常退出了。因此需要查看本次提交产生的客户端日志(默认位于$FLINK_HOME/logs目录下),再根据其中的信息定位并解决问题。
(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...
一般都是因为用户依赖第三方包的版本与Flink框架依赖的版本有冲突导致。
(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
就是字面意思,YARN集群内没有足够的资源启动Flink作业。检查一下当前YARN集群的状态、正在运行的YARN App以及Flink作业所处的队列,释放一些资源或者加入新的资源。
(4) java.util.concurrent.TimeoutException: Slot allocation request timed out
slot分配请求超时,是因为TaskManager申请资源时无法正常获得,按照上一条的思路检查即可。
(5) org.apache.flink.util.FlinkException: The assigned slot < container_id> was removed
TaskManager的Container因为使用资源超限被kill掉了。首先需要保证每个slot分配到的内存量足够,特殊情况下可以手动配置SlotSharingGroup来减少单个slot中共享Task的数量。如果资源没问题,那么多半就是程序内部发生了内存泄露。建议仔细查看TaskManager日志,并按处理JVM OOM问题的常规操作来排查。
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id < tm_id>timed out
TaskManager心跳超时。有可能是TaskManager已经失败,如果没有失败,那么有可能是因为网络不好导致JobManager没能收到心跳信号,或者TaskManager忙于GC,无法发送心跳信号。JobManager会重启心跳超时的TaskManager,如果频繁出现此异常,应该通过日志进一步定位问题所在。
在Flink中,资源的隔离是通过Slot进行的,也就是说多个Slot会运行在同一个JVM中,这种隔离很弱,尤其对于生产环境。Flink App上线之前要在一个单独的Flink集群上进行测试,否则一个不稳定、存在问题的Flink App上线,很可能影响整个Flink集群上的App。
(7)资源不足导致 container 被 kill
The assigned slot container_container编号 was removed.
Flink App 抛出此类异常,通过查看日志,一般就是某一个 Flink App 内存占用大,导致 TaskManager(在 Yarn 上就是 Container )被Kill 掉。
但是并不是所有的情况都是这个原因,还需要进一步看 yarn 的日志( 查看 yarn 任务日志:yarn logs -applicationId -appOwner),如果代码写的没问题,就确实是资源不够了,其实 1G Slot 跑多个Task( Slot Group Share )其实挺容易出现的。
因此有两种选择,可以根据具体情况,权衡选择一个。
将该 Flink App 调度在 Per Slot 内存更大的集群上。通过 slotSharingGroup("xxx") ,减少 Slot 中共享 Task 的个数
(8)启动报错,提示找不到 jersey 的类
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties 解决办法进入 yarn中 把 lib 目中的一下两个问价拷贝到flink的lib中 hadoop/share/hadoop/yarn/lib/jersey-client-1.9.jar /hadoop/share/hadoop/yarn/lib/jersey-core-1.9.jar
(9)Scala版本冲突
java.lang.NoSuchMethodError:scala.collection.immutable.HashSet$.empty()Lscala/collection/ 解决办法,添加: import org.apache.flink.api.scala._
(10)没有使用回撤流报错
Table is not an append一only table. Use the toRetractStream() in order to handle add and retract messages. 这个是因为动态表不是 append-only 模式的,需要用 toRetractStream ( 回撤流) 处理就好了.
tableEnv.toRetractStreamPerson.print()
(11)OOM 问题解决思路 java.lang.OutOfMemoryError: GC overhead limit exceeded java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at com.esotericsoftware.kryo.io.Input.readString(Input.java:466) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:177) ...... at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
解决方案:
检查 slot 槽位够不够或者 slot 分配的数量有没有生效。程序起的并行是否都正常分配了(会有这样的情况出现,假如 5 个并行,但是只有 2 个在几点上生效了,另外 3 个没有数据流动)。检查flink程序有没有数据倾斜,可以通过 flink 的 ui 界面查看每个分区子节点处理的数据量。
(12)解析返回值类型失败报错
The return type of function could not be determined automatically Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(RemoteEnvironmentTest.java:27)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.java.DataSet.getType(DataSet.java:178) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
解决方案:产生这种现象的原因一般是使用 lambda 表达式没有明确返回值类型,或者使用特使的数据结构 flink 无法解析其类型,这时候我们需要在方法的后面添加返回值类型,比如字符串。
input.flatMap((Integer number, Collector< String> out) -> { ...... }) // 提供返回值类型 .returns(Types.STRING)
(13)Hadoop jar 包冲突
Caused by: java.io.IOException: The given file system URI (hdfs:///data/checkpoint-data/abtest) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS'). at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:135) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
解决:pom 文件中去掉和 hadoop 相关的依赖就好了
作业问题
(1)org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如POJO内有空字段,或者抽取事件时间的时间戳为null等。
(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down
很多童鞋拿着这两条异常信息来求助,但实际上它们只是表示BufferPool、MemoryManager这些Flink运行时组件被销毁,亦即作业已经失败。具体的原因多种多样,根据经验,一般是上一条描述的情况居多(即Could not forward element to next operator错误会伴随出现),其次是JDK版本问题。具体情况还是要根据TaskManager日志具体分析。
(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大akka.ask.timeout参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作(Async I/O)。
(4) java.io.IOException: Too many open files
这个异常我们应该都不陌生,首先检查系统ulimit -n的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,Flink使用RocksDB状态后端也有可能会抛出这个异常,此时需修改flink-conf.yaml中的state.backend.rocksdb.files.open参数,如果不限制,可以改为-1。
(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '< class>' are missing
在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用,注意调用returns()方法指定被擦除的类型。
(6)Checkpoint失败:Checkpoint expired before completing
原因是因为checkpointConf.setCheckpointTimeout(8000L)。设置的太小了,默认是10min,这里只设置了8sec。当一个Flink App背压的时候(例如由外部组件异常引起),Barrier会流动的非常缓慢,导致Checkpoint时长飙升。
检查点和状态问题
(1) Received checkpoint barrier for checkpoint < cp_id> before completing current checkpoint < cp_id>. Skipping current checkpoint
在当前检查点还未做完时,收到了更新的检查点的barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。
(2) Checkpoint < cp_id> expired before completing
首先应检查CheckpointConfig.setCheckpointTimeout()方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。
(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible
我们知道Flink的状态是按key组织并保存的,如果程序逻辑内改了keyBy()逻辑或者key的序列化逻辑,就会导致检查点/保存点的数据无法正确恢复。所以如果必须要改key相关的东西,就弃用之前的状态数据吧。
(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported
在1.9之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState的schema,恢复作业时会抛出此异常,表示不支持更改schema。这个问题已经在FLINK-11947解决,升级版本即可。
(5)时钟不同步导致无法启动
启动Flink任务的时候报错 Caused by: java.lang.RuntimeException: Couldn't deploy Yarn cluster。
然后仔细看发现:system times on machines may be out of sync。
意思说是机器上的系统时间可能不同步。同步集群机器时间即可。
推荐系统之标签体系
漫说数据湖——如何建湖?如何做数据ETL?为什么大数据需要数据湖?
小米用户画像实战(附48页PPT)
更文不易,点个“在看”支持一下👇